%%-*- mode: erlang -*-
%%--------------------------------------------------------------------
%% Bridges
%%--------------------------------------------------------------------
{mapping, "bridge.mqtt.$name.address", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.proto_ver", "emqx_bridge_mqtt.bridges", [
  {datatype, {enum, [mqttv3, mqttv4, mqttv5]}}
]}.

{mapping, "bridge.mqtt.$name.bridge_mode", "emqx_bridge_mqtt.bridges", [
  {default, false},
  {datatype, {enum, [true, false]}}
]}.

{mapping, "bridge.mqtt.$name.start_type", "emqx_bridge_mqtt.bridges", [
  {datatype, {enum, [manual, auto]}},
  {default, auto}
]}.

{mapping, "bridge.mqtt.$name.clientid", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.clean_start", "emqx_bridge_mqtt.bridges", [
  {default, true},
  {datatype, {enum, [true, false]}}
]}.

{mapping, "bridge.mqtt.$name.username", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.password", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.forwards", "emqx_bridge_mqtt.bridges", [
  {datatype, string},
  {default, ""}
]}.

{mapping, "bridge.mqtt.$name.forward_mountpoint", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.subscription.$id.topic", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.subscription.$id.qos", "emqx_bridge_mqtt.bridges", [
  {datatype, integer}
]}.

{mapping, "bridge.mqtt.$name.receive_mountpoint", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.ssl", "emqx_bridge_mqtt.bridges", [
  {datatype, flag},
  {default, off}
]}.

{mapping, "bridge.mqtt.$name.cacertfile", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.certfile", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.keyfile", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.ciphers", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.psk_ciphers", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.keepalive", "emqx_bridge_mqtt.bridges", [
  {default, "10s"},
  {datatype, {duration, s}}
]}.

{mapping, "bridge.mqtt.$name.tls_versions", "emqx_bridge_mqtt.bridges", [
  {datatype, string},
  {default, "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1"}
]}.

{mapping, "bridge.mqtt.$name.reconnect_interval", "emqx_bridge_mqtt.bridges", [
  {default, "30s"},
  {datatype, {duration, ms}}
]}.

{mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [
  {default, "20s"},
  {datatype, {duration, s}}
]}.

{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [
   {default, 0},
   {datatype, integer}
 ]}.

{mapping, "bridge.mqtt.$name.batch_size", "emqx_bridge_mqtt.bridges", [
  {default, 0},
  {datatype, integer}
]}.

{mapping, "bridge.mqtt.$name.queue.replayq_dir", "emqx_bridge_mqtt.bridges", [
  {datatype, string}
]}.

{mapping, "bridge.mqtt.$name.queue.replayq_seg_bytes", "emqx_bridge_mqtt.bridges", [
  {datatype, bytesize}
]}.

{mapping, "bridge.mqtt.$name.queue.max_total_size", "emqx_bridge_mqtt.bridges", [
  {datatype, bytesize}
]}.

{translation, "emqx_bridge_mqtt.bridges", fun(Conf) ->

    MapPSKCiphers = fun(PSKCiphers) ->
                        lists:map(
                          fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
                             ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
                             ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
                             ("PSK-RC4-SHA") -> {psk, rc4_128, sha}
                          end, PSKCiphers)
                    end,

    Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,

    IsSsl = fun(cacertfile)   -> true;
               (certfile)     -> true;
               (keyfile)      -> true;
               (ciphers)      -> true;
               (psk_ciphers)  -> true;
               (tls_versions) -> true;
               (_Opt)         -> false
            end,

    Parse = fun(tls_versions, Vers) ->
                    [{versions, [list_to_atom(S) || S <- Split(Vers)]}];
               (ciphers, Ciphers) ->
                    [{ciphers, Split(Ciphers)}];
               (psk_ciphers, Ciphers) ->
                    [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}];
               (Opt, Val) ->
                    [{Opt, Val}]
            end,

    Merge = fun(forwards, Val, Opts) ->
                  [{forwards, string:tokens(Val, ",")}|Opts];
               (Opt, Val, Opts) ->
                  case IsSsl(Opt) of
                      true ->
                          SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []),
                          lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
                      false ->
                          [{Opt, Val}|Opts]
                  end
            end,
    Queue = fun(Name) ->
                Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".queue", Conf),

                QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, _, "queue", QOpt], QValue} <- Configs],
                maps:from_list(QOpts)
            end,
    Subscriptions = fun(Name) ->
                        Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".subscription", Conf),
                        lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, _, "subscription", I, "topic"], Topic} <- Configs])],
                                  [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, _, "subscription", I, "qos"], QoS} <- Configs])])
                    end,
    IsNodeAddr = fun(Addr) ->
                      case string:tokens(Addr, "@") of
                          [_NodeName, _Hostname] -> true;
                           _ -> false
                      end
                 end,
    ConnMod = fun(Name) ->

                      [AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".address", Conf),
                      {_, Addr} = AddrConfig,

                      Subs = Subscriptions(Name),
                      case IsNodeAddr(Addr) of
                          true when Subs =/= [] ->
                              error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs});
                          true ->
                              emqx_bridge_rpc;
                          false ->
                              emqx_bridge_mqtt
                      end
              end,

    %% to be backward compatible
    Translate =
        fun Tr(queue, Q, Cfg) ->
                NewQ = maps:fold(Tr, #{}, Q),
                Cfg#{queue => NewQ};
            Tr(address, Addr0, Cfg) ->
                Addr = case IsNodeAddr(Addr0) of
                           true -> list_to_atom(Addr0);
                           false -> Addr0
                       end,
                Cfg#{address => Addr};
            Tr(reconnect_interval, Ms, Cfg) ->
                Cfg#{reconnect_delay_ms => Ms};
            Tr(proto_ver, Ver, Cfg) ->
                Cfg#{proto_ver =>
                   case Ver of
                       mqttv3 -> v3;
                       mqttv4 -> v4;
                       mqttv5 -> v5;
                       _ -> v4
                   end};
            Tr(max_inflight_size, Size, Cfg) ->
                Cfg#{max_inflight => Size};
            Tr(Key, Value, Cfg) ->
                Cfg#{Key => Value}
         end,
    C = lists:foldl(
        fun({["bridge", "mqtt", Name, Opt], Val}, Acc) ->
                %% e.g #{aws => [{OptKey, OptVal}]}
                Init = [{list_to_atom(Opt), Val},
                        {connect_module, ConnMod(Name)},
                        {subscriptions, Subscriptions(Name)},
                        {queue, Queue(Name)}],
                maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
           (_, Acc) -> Acc
        end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.mqtt", Conf))),
    C1 = maps:map(fun(Bn, Bc) ->
                      maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc)))
                  end, C),
    maps:to_list(C1)
end}.
